package com.itrip.log.module.collect;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.itrip.log.core.BootstrapServer;
import com.itrip.log.core.ReaderBean;
import com.itrip.log.core.ThreadPools;
import com.itrip.log.module.db.domain.Dictionaries;
import com.itrip.log.module.db.domain.HostBean;
import com.itrip.log.module.db.server.ConfigManager;
import com.itrip.log.module.handler.HandlerModule;
import com.itrip.log.util.Util;
import com.jcraft.jsch.Channel;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.Session;

/**
 * Function:远程tail -f 采集器
 *
 * @date:2016年10月19日/下午8:12:57
 * @Author:coder_czp@126.com
 * @version:1.0
 */
public class TailfInputStream implements IRemoteLogInputStream {

	private static final Logger log = LoggerFactory.getLogger(TailfInputStream.class);
	private AtomicInteger retryTime = new AtomicInteger();
	private HandlerModule logModule;
	private boolean isRunning = true;
	private ConfigManager cfgManager;
	private Session session;
	private Channel channel;
	private HostBean host;

	public TailfInputStream(HostBean host, ConfigManager cfgManager) {
		this.cfgManager = cfgManager;
		this.host = host;
	}

	@Override
	public String name() {
		return host.getText();
	}

	@Override
	public HostBean getRemoteHost() {
		return host;
	}

	public void start() {
		log.info("FileMonitor is running:{}", host);
		isRunning = true;
		ThreadPools.getInstance().startThread(name(), this);
	}

	public void stop() {
		isRunning = false;

		if(session!=null&& session.isConnected()){
			session.disconnect();
		}
		if (channel != null && channel.isConnected()) {
			channel.disconnect();
		}
		
		
	}

	@Override
	public void run() {
		try {
			String cmd = buldMonitorCmd();
			PipedOutputStream src = new PipedOutputStream();
			PipedOutputStream srcSend = new PipedOutputStream();
			PipedInputStream readResp = new PipedInputStream(src);
			PipedInputStream pisSend = new PipedInputStream(srcSend);

			JSch jsch = new JSch();
			Session session = jsch.getSession(host.getUserName(), host.getHost(), host.getPort());
			session.setConfig("StrictHostKeyChecking", "no");
			session.setPassword(Util.decrypt(host.getPwd()));
			session.connect(30000);

			channel = session.openChannel("shell");
			channel.setInputStream(pisSend);
			channel.setOutputStream(src);
			channel.connect(5000);
			sendCommad(srcSend, cmd);

			String line = null;
			String lastFile = null;
			BufferedReader bufRead = new BufferedReader(new InputStreamReader(readResp));

			String hostName = host.getText();
			while (isRunning && (line = bufRead.readLine()) != null) {
				lastFile = getFileName(line, lastFile);
				if (lastFile == null || line.length() == 0) {
					log.debug("error file {} {} {}", host, lastFile, line);
					continue;
				}
				ReaderBean bean = new ReaderBean();
				bean.setHost(hostName);
				bean.setFile(lastFile);
				bean.setLine(line);
				logModule.handle(bean);
			}
			bufRead.close();
			log.info("FileMonitor {} exit", host);
		} catch (Exception e) {
			log.error("error ,will reconn after 10s {}", host, e);
			reconntionServerWhenError();
		}
	}

	// tail: /xxx/xxx/gc.log: file truncated
	// tail: `/var/logs/error.log' has appeared; following end of new file
	private String getFileName(String line, String lastFile) {
		int splitSize = " ==>".length();
		if (line.startsWith("==>") || (line.startsWith("tail:")
				&& (line.endsWith("file truncated") || line.endsWith("following end of new file")))) {
			lastFile = line.substring(splitSize, line.length() - splitSize).trim().toLowerCase();
		}
		return lastFile;
	}

	private void reconntionServerWhenError() {
		try {
			if (retryTime.getAndIncrement() < 10) {
				log.info("reconntion:{},time:{}", host,retryTime.get());
				Thread.sleep(10000);
				this.stop();
				this.start();
			} else {
				retryTime.set(0);
				log.error("reconn 10 times,but can't conn so stop reconn");
			}
		} catch (Exception e1) {
			log.error("error when reconn {}", host, e1);
		}

	}

	private String buldMonitorCmd() {
		StringBuffer cmd = new StringBuffer("tail -n 1 -F ");
		for (Dictionaries cfg : cfgManager.loadMonitorFiles(host)) {
			String filePath = cfg.getValue();
			if (cmd.indexOf(filePath) == -1) {
				cmd.append(filePath).append(" ");
				log.info("monitor:{}", filePath);
			}
		}
		return cmd.toString();
	}

	private void sendCommad(PipedOutputStream out, String cmd) throws IOException {
		out.write(cmd.getBytes());
		out.write('\n');
		out.flush();
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((host == null) ? 0 : host.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		TailfInputStream other = (TailfInputStream) obj;
		if (host == null) {
			if (other.host != null)
				return false;
		} else if (!host.equals(other.host))
			return false;
		return true;
	}

	@Override
	public void setBoostServer(BootstrapServer server) {
		logModule = server.getModule(HandlerModule.class);
	}
}
